package per.hmm.kakfa.producer;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
 * User: Administrator
 * Date: 2021/1/7
 * Time: 15:25
 * Description:
 * kafka拦截器
 */
public class ProducerInterceptPrefix implements ProducerInterceptor<String, String> {

    private volatile long sendSuccess = 0;
    private volatile long sendFailed = 0;
    /**
     * 消息发送前拦截消息 修改消息
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        String modifyValue = "prefix" + record.value();
        return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), modifyValue, record.headers());
    }

    /**
     * 消息发送结果判断 做进一步处理
     * @param metadata
     * @param exception
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            System.out.println("拦截器中打印：发送 成功 >>>>>>>>>>>>");
            sendSuccess ++;
        } else {
            System.out.println("拦截器中打印：发送 失败 >>>>>>>>>>>>");
            sendFailed ++;
        }
    }

    @Override
    public void close() {
        double ratio = (double)sendSuccess / (sendFailed + sendSuccess);
        System.out.println(">>>>>>>>>>>>发送成功率：" + ratio + ">>>>>>>>>>>>");
    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
